{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Input HMP\n",
    "This notebook pulls the HMP accelerometer sensor data classification data set"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%bash\n",
    "export version=`python --version |awk '{print $2}' |awk -F\".\" '{print $1$2}'`\n",
    "\n",
    "echo $version\n",
    "\n",
    "if [ $version == '36' ] || [ $version == '37' ]; then\n",
    "    echo 'Starting installation...'\n",
    "    pip3 install pyspark==2.4.8 wget==3.2 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "elif [ $version == '38' ] || [ $version == '39' ]; then\n",
    "    pip3 install pyspark==3.1.2 wget==3.2 > install.log 2> install.log\n",
    "    if [ $? == 0 ]; then\n",
    "        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'\n",
    "    else\n",
    "        echo 'Installation failed, please check log:'\n",
    "        cat install.log\n",
    "    fi\n",
    "else\n",
    "    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'\n",
    "    exit -1\n",
    "fi"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import fnmatch\n",
    "import os\n",
    "from pathlib import Path\n",
    "from pyspark import SparkConf\n",
    "from pyspark import SparkContext\n",
    "from pyspark.sql.functions import lit\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.types import IntegerType\n",
    "from pyspark.sql.types import StructField\n",
    "from pyspark.sql.types import StructType\n",
    "import random\n",
    "import re\n",
    "import shutil\n",
    "import sys"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# path and file name for output (default: data.csv)\n",
    "data_csv = os.environ.get('data_csv', 'data.csv')\n",
    "\n",
    "# url of master (default: local mode)\n",
    "master = os.environ.get('master', \"local[*]\")\n",
    "\n",
    "# temporal data storage for local execution\n",
    "data_dir = os.environ.get('data_dir', '../../data/')\n",
    "\n",
    "# sample on input data to increase processing speed 0..1 (default: 1.0)\n",
    "sample = os.environ.get('sample', '1.0')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# override parameters received from a potential call using %run magic\n",
    "parameters = list(\n",
    "    map(\n",
    "        lambda s: re.sub('$', '\"', s),\n",
    "        map(\n",
    "            lambda s: s.replace('=', '=\"'),\n",
    "            filter(\n",
    "                lambda s: s.find('=') > -1,\n",
    "                sys.argv\n",
    "            )\n",
    "        )\n",
    "    )\n",
    ")\n",
    "\n",
    "for parameter in parameters:\n",
    "    exec(parameter)\n",
    "\n",
    "# cast parameters to appropriate type\n",
    "sample = float(sample)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Lets create a local spark context (sc) and session (spark)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc = SparkContext.getOrCreate(SparkConf().setMaster(master))\n",
    "\n",
    "spark = SparkSession \\\n",
    "    .builder \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Lets pull the data in raw format from the source (github)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!rm -Rf HMP_Dataset\n",
    "!git clone https://github.com/wchill/HMP_Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "schema = StructType([\n",
    "    StructField(\"x\", IntegerType(), True),\n",
    "    StructField(\"y\", IntegerType(), True),\n",
    "    StructField(\"z\", IntegerType(), True)])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This step takes a while, it parses through all files and folders and creates a temporary dataframe for each file which gets appended to an overall data-frame \"df\". In addition, a column called \"class\" is added to allow for straightforward usage in Spark afterwards in a supervised machine learning scenario for example."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": true,
    "tags": []
   },
   "outputs": [],
   "source": [
    "d = 'HMP_Dataset/'\n",
    "\n",
    "# filter list for all folders containing data (folders that don't start with .)\n",
    "file_list_filtered = [s for s in os.listdir(d)\n",
    "                      if os.path.isdir(os.path.join(d, s)) & ~fnmatch.fnmatch(s, '.*')]\n",
    "\n",
    "# create pandas data frame for all the data\n",
    "\n",
    "df = None\n",
    "\n",
    "for category in file_list_filtered:\n",
    "    data_files = os.listdir('HMP_Dataset/' + category)\n",
    "\n",
    "    # create a temporary pandas data frame for each data file\n",
    "    for data_file in data_files:\n",
    "        if sample < 1.0:\n",
    "            if random.random() > sample:\n",
    "                print('Skipping: ' + data_file)\n",
    "                continue\n",
    "        print(data_file)\n",
    "        temp_df = spark.read. \\\n",
    "            option(\"header\", \"false\"). \\\n",
    "            option(\"delimiter\", \" \"). \\\n",
    "            csv('HMP_Dataset/' + category + '/' + data_file, schema=schema)\n",
    "\n",
    "        # create a column called \"source\" storing the current CSV file\n",
    "        temp_df = temp_df.withColumn(\"source\", lit(data_file))\n",
    "\n",
    "        # create a column called \"class\" storing the current data folder\n",
    "        temp_df = temp_df.withColumn(\"class\", lit(category))\n",
    "\n",
    "        if df is None:\n",
    "            df = temp_df\n",
    "        else:\n",
    "            df = df.union(temp_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Lets write the dataf-rame to a file in \"CSV\" format, this will also take quite some time:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if Path(data_dir + data_csv).exists():\n",
    "    shutil.rmtree(data_dir + data_csv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df.write.option(\"header\", \"true\").csv(data_dir + data_csv)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we should have a CSV file with our contents"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
